Amazon MWAAに依存ライブラリを追加してみた
こんにちは。サービスグループの武田です。
本日発表されたAWSのマネージドAirflowであるAmazon MWAA(Managed Workflow for Apache Airflow)で遊んでいます。
今回は前回作ったAirflowの環境にライブラリを追加する手順を確認してみました。
ライブラリの前提
次のライブラリは追加の必要なく使用できます。
- Airflowが依存しているライブラリ
- AWSが追加しているライブラリ
そもそもAirflowが依存しているライブラリはインストールされているので、自分で追加する必要がありません。たとえばPandasやNumPy、Pendulumなどがあげられます。次に、AWSが追加しているライブラリが2つあり、psycopg2
とboto
はセットアップ済みとなっています。MWAAのメタデータストアとしておそらくPostgreSQLを使っているのと、botoはまぁAWS環境ですしね。
というわけで、それ以外のライブラリが別途必要な場合には、自分で追加する必要があります。
今回のゴール
今回はスクレイピングライブラリとして人気のあるBeautifulSoup
を追加して、Developers.IOのトップページからタイトルを抜き出してみます。
依存ライブラリがない状態を確認
まずはDAGの中で使用しているライブラリが環境にない場合を確認してみます。Airflowに追加するDAGとして次のスクリプトを用意しました。
import urllib.request import airflow from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from bs4 import BeautifulSoup args = { "owner": "airflow", "start_date": airflow.utils.dates.days_ago(2), "provide_context": True, } def get_devio_top_page(url, **context): return urllib.request.urlopen(url).read().decode("utf-8") def print_title(html, **context): soup = BeautifulSoup(html, "html.parser") print(soup.find("title")) with DAG( dag_id="scraping_test", default_args=args, schedule_interval=None, ) as dag: t1 = PythonOperator( task_id="task1", python_callable=get_devio_top_page, op_kwargs={"url": "https://dev.classmethod.jp/"}, ) t2 = PythonOperator( task_id="task2", python_callable=print_title, op_kwargs={"html": '{{ ti.xcom_pull(task_ids="task1") }}'}, ) t1 >> t2
task1
はDevIOのトップページを取得しているタスクです。標準ライブラリのurllib
を使っているごく普通のプログラムです。task2
がBeautifulSoup
を使ってページのタイトルを抜き出してログに出す処理をしています。ページの中身は前のタスクで取得しているので、xcom経由で受け渡しています。
このファイルをS3にアップロードします。
しばらく待ってからAirflowの管理画面を開くと次のようにエラーメッセージが出ていました。
bs4
のモジュールがないよってことでエラーになっています。それではrequirements.txt
を用意して環境にライブラリを追加していきましょう。
依存ライブラリを追加してみた
MWAAの環境に依存ライブラリを追加するには、まずそれが書かれているrequirements.txt
を用意します。今回必要となったのはBeautifulSoup
だけですので、それを記述します。
beautifulsoup4==4.9.3
現時点ではPEP 508に準拠はしておらず、pip freeze
での生成は非推奨とのことです。
用意したファイルはS3バケットにアップロードします。MWAAがアクセスできれば任意の場所で問題ないはずです。今回はdags
フォルダーと同じ階層に置きました。
マネジメントコンソールでMWAAにアクセスし、更新する環境にチェックを入れて[編集]をクリックします。
要件ファイルの項目で、先ほどアップロードしたrequirements.txt
を選択します。ほかは変更せずに[保存]をクリックします。
環境の更新が始まります。
約20分ほどで更新が完了しました。環境クラスやWorkerインスタンスの台数によっては、この時間が前後するかもしれません。
先ほどエラーが出ていたAirflowの管理画面をもう一度開き、ページをリロードしてみます。今度は問題なくDAGが追加されていました!
DAGを有効にして手動で実行してみると、タスクが実行されました。
task2のログを確認してみると、DevIOのタイトルが出力されています!
まとめ
Pythonには豊富なライブラリがそろっていますが、インストールしなければ使えません。MWAAの環境でも問題なくライブラリを追加できますので、安心してDAGを構築できそうですね。